Testing SparkMonitor Extension

The configuration object SparkConf is provided by the extension, added to the namespace as 'conf'.
The user passes this to the SparkContext


In [ ]:
print(conf.toDebugString()) #Instance of SparkConf with options set by the extension

User adds other options and starts the spark context


In [ ]:
conf.setAppName('ExtensionTestingApp')
#conf.setMaster('spark://dell-inspiron:7077') # if master is started using command line
conf.setMaster('local[*]')
from pyspark import SparkContext
sc=SparkContext.getOrCreate(conf=conf) #Start the spark context

Example spark job


In [ ]:
import time
b=sc.broadcast([3,5]) #Creating a broadcast variable available on all executors
a=sc.accumulator(0)   #Creating an accumulator for adding values across executors
RDD0=sc.parallelize([y for y in range(0,5)]) #RDD from input python collection
RDD2=sc.parallelize([z for z in range(10,15)])
RDD1=RDD0.cartesian(RDD2) 
cached=RDD2.cache() #Testing cached RDD
RDD22=RDD1.map(lambda x:x[0]+x[1]+b.value[0])
RDD3=RDD22.repartition(5) # To trigger a new stage.
RDD4=RDD2.map(lambda x: 3*x-b.value[0])
RDD5=RDD3.filter(lambda x:x%2==0)
RDD6=RDD4.filter(lambda x:x%2!=0)
RDD7=RDD5.cartesian(RDD6)
RDD8=RDD7.flatMap(lambda x: [x[i] for i in range(0,2)])
RDD9=RDD8.union(cached)
ans=RDD9.reduce(lambda x,y: x+y) # Doing a simple sum on the random data.
print(ans)
def f(x):
    global a
    time.sleep(0.5) #Making the job run a little longer
    a+=x
RDD9.foreach(f)
print(a.value)
#Display should appear automatically

In [ ]:
sc.parallelize(range(0,100)).count()
sc.parallelize(range(0,100)).count()
sc.parallelize(range(0,100)).count()
sc.parallelize(range(0,100)).count()
sc.parallelize(range(0,100)).count()

In [ ]:
sc.parallelize(range(0,100)).map(lambda x:x*x).filter(lambda x:x%2==0).count()
sc.parallelize(range(0,100)).map(lambda x:x*x).filter(lambda x:x%2==0).count()
sc.parallelize(range(0,100)).map(lambda x:x*x).filter(lambda x:x%2==0).count()
sc.parallelize(range(0,100)).map(lambda x:x*x).filter(lambda x:x%2==0).count()

In [ ]:
sc.stop()